ODH Logo

Xskipper - Extensible Data Skipping Framework for Apache Spark

In this notebook, we demonstrate the use of Xskipper Python API.

Data skipping can significantly boost the performance of SQL queries by skipping over irrelevant data objects or files based on summary metadata associated with each object.
For each object's column, the summary metadata might include minimum and maximum values, a list or bloom filter of the appearing values, or other metadata which succinctly represents the data in that column. This metadata is used during query evaluation to skip over objects which have no relevant data.

To use this feature, you need to create indexes on one or more columns of the data set. Once indexed, Spark SQL queries can benefit from data skipping. In general, you should index the columns which are queried most often in the WHERE clause.

Note that all Spark native data formats are supported, including Parquet, ORC, CSV, JSON and Avro. Data skipping is a performance optimization feature, meaning the use of data skipping does not affect the content of the query results.

Setup

In this example, we set a JVM wide parameter with a base path to store all the indexes.
Note it's possible to store the metadata on the same storage system as the data but not under the same path.

In the following examples we will use IBM Cloud Object Storage to store both the data and the metadata.
(The cells below assume the credentials were already set, for more information about setting credentials when using IBM Cloud Object Storage see Stocator Storage Connector.

During query time the metdata for the dataset will be looked up in this location

For more configuration options, see Data skipping configuration options.

[3]
from xskipper import Xskipper

# The base location to store all metadata
# TODO: change to your metadata location
md_base_location = "cos://{0}.service/{1}".format(credentials['BUCKET'], "metadata")

# Configuring the JVM wide parameters 
# in addition configure the identifier class for IBM Cloud Object Storage
conf = dict([
            ("io.xskipper.parquet.mdlocation", md_base_location),
            ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION"),
            ("io.xskipper.identifierclass", "io.xskipper.utils.identifier.IBMCOSIdentifier")])
Xskipper.setConf(spark, conf)

Indexing a dataset

Creating a sample Dataset

First, let's create a sample dataset that will be used throught this sample.

[4]
from pyspark.sql.types import *

# TODO: change to your data location
dataset_location = "cos://{0}.service/{1}".format(credentials['BUCKET'], "data")

df_schema = StructType([StructField("dt", StringType(), True), StructField("temp", DoubleType(), True),\
                      StructField("city", StringType(), True), StructField("vid", StringType(), True)]) 

data = [("2017-07-07", 20.0, "Tel-Aviv", "a"), ("2017-07-08", 30.0, "Jerusalem", "b")]

df = spark.createDataFrame(data, schema=df_schema) 

# use partitionBy to make sure we have two objects
df.write.partitionBy("dt").mode("overwrite").parquet(dataset_location)

# read the dataset back from storage
reader = spark.read.format("parquet")
df = reader.load(dataset_location)
df.show(10, False)
+----+---------+---+----------+ |temp|city |vid|dt | +----+---------+---+----------+ |30.0|Jerusalem|b |2017-07-08| |20.0|Tel-Aviv |a |2017-07-07| +----+---------+---+----------+

Indexing

To create data skipping indexes on a data set, decide which columns to index, then choose an index type for each column. These choices are workload and data dependent. We recommend to select columns which frequently appear in your workload's queries predicates.

The following index types are supported out of the box:

  1. Min/max – stores the minimum and maximum values for a column. Applies to all types except complex types.

  2. Value list – stores the list of values appearing in a column. Applies to all types except complex types.

  3. Bloom Filter – stores bloom filter. Applies to ByteType, StringType, LongType, IntegerType, and ShortType.

    Rule of thumb

  • Choose value list if the number of distinct values in an object is typically much smaller than the total number of values in that object
  • Bloom filters are recommended for columns with high cardinality. (otherwise the index can get as big as that column in the data set).

Xskipper also enables defining custom (create your own) data skipping indexes and to specify how to apply them during query time. For more details see here

[5]
# create Xskipper instance for the sample dataset
xskipper = Xskipper(spark, dataset_location)

# remove index if exists
if xskipper.isIndexed():
    xskipper.dropIndex()

xskipper.indexBuilder() \
        .addMinMaxIndex("temp") \
        .addValueListIndex("city") \
        .addBloomFilterIndex("vid") \
        .build(reader) \
        .show(10, False)
+-------+-----------------+-------------------+ |status |new_entries_added|old_entries_removed| +-------+-----------------+-------------------+ |SUCCESS|2 |0 | +-------+-----------------+-------------------+

View the created index status

To view existing dataset's data skipping indexes information and their status

[6]
xskipper.describeIndex(reader).show()
+-------------------------+---------------------------------------------------------+--------------------+ |Data Skipping Index Stats|cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data| Comment| +-------------------------+---------------------------------------------------------+--------------------+ | Status| Up to date| | | Total objects ind...| 2| | | # Metadata proper...| | | | Metadata location| cos://guyx27snote...| | | # Index information| | | | # Index type| Columns| Params| | minmax| temp| | | valuelist| city| | | bloomfilter| vid|io.xskipper.index...| +-------------------------+---------------------------------------------------------+--------------------+

List Indexed datasets

To view all indexed dataset under the current base location

[7]
Xskipper.listIndexes(spark).show(10, False)
+---------------------------------------------------------+---------------------------------------------------------------------+-------------+ |Dataset |Index type |Index columns| +---------------------------------------------------------+---------------------------------------------------------------------+-------------+ |# Metadatastore Manager parameters | | | |Metadata base path |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp.service/metadata| | |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data|minmax |temp | | |valuelist |city | | |bloomfilter |vid | +---------------------------------------------------------+---------------------------------------------------------------------+-------------+

Using the Data Skipping Indexes

Enable/Disable Xskipper

Xskipper provides APIs to enable or disable index usage with Spark.

When "enable", Xskipper optimization rules become visible to the Apache Spark optimizer and will be used in query optimization and execution.\ When "disable', Xskipper optimization rules no longer apply during query optimization. Note that disabling Xskipper has no impact on created indexes as they remain intact.

[8]
# Enable Xskipper
Xskipper.enable(spark)

# Disable Xskipper
Xskipper.disable(spark)

# You can use the following to check whether the Xskipper is enabled
if not Xskipper.isEnabled(spark):
    Xskipper.enable(spark)

Running Queries

Once Xskipper has been enabled you can continue running queries (using either SQL or DataFrame API) regularly and enjoy data skipping.

Reading the Dataset and creating a temporary view

[9]
df = reader.load(dataset_location)
df.createOrReplaceTempView("sample")

Example query using Min/max index

Min/max index filters out 1 dataset object in which it's temp column value is >= 30

[10]
spark.sql("select * from sample where temp < 30").show()
+----+--------+---+----------+ |temp| city|vid| dt| +----+--------+---+----------+ |20.0|Tel-Aviv| a|2017-07-07| +----+--------+---+----------+
Inspecting query skipping stats

You can inspect the data skipping statistics for the latest query using the following API:

[11]
Xskipper.getLatestQueryAggregatedStats(spark).show(10, False)
+-------+-----------+-------------+------------+-----------+----------+ |status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs| +-------+-----------+-------------+------------+-----------+----------+ |SUCCESS|true |903 |1 |1797 |2 | +-------+-----------+-------------+------------+-----------+----------+

Note: The above returns the accumulated data skipping statistics for all of the datasets which were involved in the query

If you want to inspect the stats for a specific dataset use the following api call on the dataset's Xskipper instance:

[12]
xskipper.getLatestQueryStats().show(10, False)
+-------+-----------+-------------+------------+-----------+----------+ |status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs| +-------+-----------+-------------+------------+-----------+----------+ |SUCCESS|true |903 |1 |1797 |2 | +-------+-----------+-------------+------------+-----------+----------+
Clearing the stats before running the next query

The data skipping stats are accumulated stats of all dataset readings since the last time clearStats or reset was called.\ Here we clear the stats after each query to get the data skipping stats for each query separately.

[13]
Xskipper.clearStats(spark)

Example query using Value list index

[14]
spark.sql("select * from sample where city IN ('Jerusalem', 'Ramat-Gan')").show()
+----+---------+---+----------+ |temp| city|vid| dt| +----+---------+---+----------+ |30.0|Jerusalem| b|2017-07-08| +----+---------+---+----------+

Inspecting query stats

[15]
Xskipper.getLatestQueryAggregatedStats(spark).show(10, False)
+-------+-----------+-------------+------------+-----------+----------+ |status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs| +-------+-----------+-------------+------------+-----------+----------+ |SUCCESS|true |894 |1 |1797 |2 | +-------+-----------+-------------+------------+-----------+----------+
Clearing the stats before running the next query
[16]
Xskipper.clearStats(spark)

Example Query using Bloom filter index

[17]
spark.sql("select * from sample where vid = 'a'").show()
+----+--------+---+----------+ |temp| city|vid| dt| +----+--------+---+----------+ |20.0|Tel-Aviv| a|2017-07-07| +----+--------+---+----------+

Inspecting query stats

[18]
Xskipper.getLatestQueryAggregatedStats(spark).show(10, False)
+-------+-----------+-------------+------------+-----------+----------+ |status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs| +-------+-----------+-------------+------------+-----------+----------+ |SUCCESS|true |903 |1 |1797 |2 | +-------+-----------+-------------+------------+-----------+----------+
Clearing the stats before running the next query
[19]
Xskipper.clearStats(spark)

Index Life Cycle

The following operations can be used in order to maintain the index

Refresh Index

Overtime the index might get stale in case new files were added/removed/modified in the dataset.\ In order to bring the index up-to-date you can call the refresh operation which will index the new/modified files and remove obsolete metadata.

Note: The index will still be useful for files which didn't change since the last indexing time even without refreshing.

[20]
# adding new file to the dataset to simulate changes in the dataset
update_data = [("2017-07-09", 25.0, "Beer-Sheva", "c")]

update_df = spark.createDataFrame(update_data, schema=df_schema) 

# append to the existing dataset
update_df.write.partitionBy("dt").mode("append").parquet(dataset_location)

Inspecting index status:

[21]
xskipper.describeIndex(reader).show(10, False)
+--------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Data Skipping Index Stats |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data |Comment | +--------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Status |Out of date - please use REFRESH operation to update the index | | |Total new/modified objects|33% of the objects (1 of 3) | | |Total objects indexed |2 | | |# Metadata properties | | | |Metadata location |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp.service/metadata/dddbdf9e4f6cbf274232fa18b35e5d4dab7f4b33830a4158a314c6203a2463f6| | |# Index information | | | |# Index type |Columns |Params | |minmax |temp | | |valuelist |city | | |bloomfilter |vid |io.xskipper.index.bloom.fpp -> 0.01 ; io.xskipper.index.bloom.ndv -> 100000| +--------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+

The status is out of data as there is one new object which is not yet indexed. Let's call the refresh operation:

[22]
xskipper.refreshIndex(reader).show(10, False)
+-------+-----------------+-------------------+ |status |new_entries_added|old_entries_removed| +-------+-----------------+-------------------+ |SUCCESS|1 |0 | +-------+-----------------+-------------------+

Inspecting index status following the refresh:

[23]
xskipper.describeIndex(reader).show(10, False)
+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Data Skipping Index Stats|cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data |Comment | +-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Status |Up to date | | |Total objects indexed |3 | | |# Metadata properties | | | |Metadata location |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp.service/metadata/dddbdf9e4f6cbf274232fa18b35e5d4dab7f4b33830a4158a314c6203a2463f6| | |# Index information | | | |# Index type |Columns |Params | |minmax |temp | | |valuelist |city | | |bloomfilter |vid |io.xskipper.index.bloom.fpp -> 0.01 ; io.xskipper.index.bloom.ndv -> 100000| +-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+

Drop Index

In order to drop the index use the following API call:

[24]
xskipper.dropIndex()

Working with Hive table

Xskipper also supports skipping over hive tables.

The API for working with hive tables is similar to the API presented above with 2 major differences: 1. The uri used in the Xskipper constructor is the table identifier in the form: <db>.<table>. 2. The API calls do not require a DataFrameReader.

For more info regarding the API see here

The metadata location for a hive table is resolved according to the following: 1. If the table contains the parameter io.xskipper.parquet.mdlocation the value will be used as the metadata location 2. Else, xskipper will look up the parameter io.xskipper.parquet.mdlocation in the table's database and will used it as the base metadata location for all tables.

Note: During indexing the index location parameter can be automatically added to the table properties if the xskipper instance is configured accordingly.
For more info regarding the metadata location configuration see here.

In this example we will set the base location in the database.

Setting the base metadata location in the database

[25]
alter_db_ddl = ("ALTER DATABASE default SET DBPROPERTIES ('io.xskipper.parquet.mdlocation'='{0}')").format(md_base_location)
spark.sql(alter_db_ddl)
DataFrame[]

Creating a sample Hive Table

Create the table

[26]
create_table_ddl = """CREATE TABLE IF NOT EXISTS tbl ( \
temp Double,
city String,
vid String,
dt String
)
USING PARQUET
PARTITIONED BY (dt)
LOCATION '{0}'""".format(dataset_location)
spark.sql(create_table_ddl)
DataFrame[]

Recover the table partitions

[27]
spark.sql("ALTER TABLE tbl RECOVER PARTITIONS")
DataFrame[]

verify the table was created

[28]
spark.sql("show tables").show(10, False)
spark.sql("show partitions tbl").show(10, False)
+--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ |default |tbl |false | | |sample |true | +--------+---------+-----------+ +-------------+ |partition | +-------------+ |dt=2017-07-07| |dt=2017-07-08| |dt=2017-07-09| +-------------+

Indexing a Hive Table

notice we use default.sample as the uri in the Xskipper constructor

[29]
# create Xskipper instance for the sample Hive Table
xskipper_hive = Xskipper(spark, 'default.tbl')

# remove index if exists
if xskipper_hive.isIndexed():
    xskipper_hive.dropIndex()

xskipper_hive.indexBuilder() \
        .addMinMaxIndex("temp") \
        .addValueListIndex("city") \
        .addBloomFilterIndex("vid") \
        .build() \
        .show(10, False)
+-------+-----------------+-------------------+ |status |new_entries_added|old_entries_removed| +-------+-----------------+-------------------+ |SUCCESS|3 |0 | +-------+-----------------+-------------------+

View the created index status

The following code shows how a user can view current index status to check which indexes exist on the dataset and whether the index is up-to-date

[30]
xskipper_hive.describeIndex().show(10, False)
+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Data Skipping Index Stats|default.tbl |Comment | +-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+ |Status |Up to date | | |Total objects indexed |3 | | |# Metadata properties | | | |Metadata location |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp.service/metadata/990b4d195857e280dd03b8f703a176b7beadc1a8077ac985e9a4e18ff24de6bc| | |# Index information | | | |# Index type |Columns |Params | |minmax |temp | | |valuelist |city | | |bloomfilter |vid |io.xskipper.index.bloom.fpp -> 0.01 ; io.xskipper.index.bloom.ndv -> 100000| +-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+

Enable/Disable Xskipper

Make sure Xskipper is enabled

[32]
# You can use the following to check whether the Xskipper is enabled
if not Xskipper.isEnabled(spark):
    Xskipper.enable(spark)

Running Queries

Once Xskipper has been enabled you can continue running queries (using either SQL or DataFrame API) regularly and enjoy data skipping.

Example query using Min/max index

[33]
spark.sql("select * from tbl where temp < 30").show(10, False)
+----+----------+---+----------+ |temp|city |vid|dt | +----+----------+---+----------+ |25.0|Beer-Sheva|c |2017-07-09| |20.0|Tel-Aviv |a |2017-07-07| +----+----------+---+----------+
Inspecting query skipping stats

You can inspect the data skipping statistics for the latest query using the following API:

[34]
Xskipper.getLatestQueryAggregatedStats(spark).show(10, False)
+-------+-----------+-------------+------------+-----------+----------+ |status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs| +-------+-----------+-------------+------------+-----------+----------+ |SUCCESS|true |903 |1 |2709 |3 | +-------+-----------+-------------+------------+-----------+----------+

Index Life Cycle

The following operations can be used in order to maintain the index

Refresh Index on Hive Table

[36]
xskipper_hive.refreshIndex().show(10, False)
+-------+-----------------+-------------------+ |status |new_entries_added|old_entries_removed| +-------+-----------------+-------------------+ |SUCCESS|0 |0 | +-------+-----------------+-------------------+

Drop Index

In order to drop the index use the following API call: (Dropping the index will also remove the index location from the table properties)

[37]
xskipper_hive.dropIndex()